package com.atguigu.flink.sql.window;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by Smexy on 2023/9/16
 *

   1.聚合  按照商品id分组，开基于事件时间的keyed滚动窗口(1h),统计每个商品的pv次数
   2.聚合  按照窗口end分组，使用定时器，对同一个窗口中，pv最大的top3 商品输出
 *
 */
public class Demo6_TopN
{
    public static void main(String[] args) {

        TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

        //1.建输入表
        String createTableSql = "CREATE TABLE t1 (" +
            "  uid STRING," +
            "  itemId STRING," +
            "  cId STRING," +
            "  b STRING," +
            "  ts BIGINT , " +
            "  et AS TO_TIMESTAMP_LTZ(ts,0) ," +
            "  WATERMARK FOR et AS et - INTERVAL '0.001' SECOND " +
            ")  WITH (" +
            "  'connector' = 'filesystem',   " +
            "  'path' = 'data/UserBehavior.csv', " +
            "  'format' = 'csv'             " +
            ")";

        tableEnv.executeSql(createTableSql);

        //2.读数据，执行第一次聚合
        String tumbleSql = " select  itemId, window_start, window_end , count(*) pv" +
            " from TABLE( " +
            " TUMBLE(TABLE t1, DESCRIPTOR(et), INTERVAL '1' HOUR ) )  " +
            " where b = 'pv'  " +
            " GROUP BY window_start, window_end , itemId "
            ;

        Table t2 = tableEnv.sqlQuery(tumbleSql);
        tableEnv.createTemporaryView("t2",t2);
        /*
            3.执行第二次聚合,topN
                topN： 首先，先排名。 再次，按照名次过滤

                排名函数都是 Over()窗口函数

                hive:rank, dense_rank ,row_number(),....
                flink: row_number
         */
        String topNRankSql = " select " +
                         "   itemId, window_start, window_end ,pv ," +
                         "   row_number() over(partition by window_end order by pv desc )  rk " +
                         " from t2 ";

        Table t3 = tableEnv.sqlQuery(topNRankSql);
        tableEnv.createTemporaryView("t3",t3);

        /*
            取前3
                如果没有where过滤，就不属于topN,严格遵守over()的规则。
                有where过滤，就属于topN,不遵守over()的规则，order by 可以使用任意字段排序，可以是降序。
         */

        //4.输出到Mysql数据库  ①Mysql建表 ②flink中创建一张表，映射Mysql的表
        String createSinkTableSql = "CREATE TABLE t4 (" +
            " `w_start` TIMESTAMP(3) NOT NULL ," +
            " `w_end` TIMESTAMP(3) NOT NULL ," +
            " `item_id` STRING NOT NULL," +
            " `item_count` BIGINT NOT NULL," +
            " `rk` BIGINT NOT NULL," +
            "   PRIMARY KEY (w_end,rk) NOT ENFORCED " +
            " )  WITH (" +
            "  'connector' = 'jdbc',   " +
            "  'url' = 'jdbc:mysql://hadoop102:3306/Mybatis?useSSL=false&useUnicode=true&characterEncoding=UTF-8', " +
            "  'table-name' = 'hot_item'   ," +
            "  'driver' = 'com.mysql.cj.jdbc.Driver'   ," +
            "  'username' = 'root'   ," +
            "  'password' = '000000'   " +
            " )";
        tableEnv.executeSql(createSinkTableSql);

        tableEnv.executeSql("insert into t4 select window_start, window_end, itemId,pv,rk  from t3 where rk <= 3 ");




    }
}
